- 
                Notifications
    You must be signed in to change notification settings 
- Fork 8
#862 much faster version of aggregate_15_min_mvt #863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#862 much faster version of aggregate_15_min_mvt #863
Conversation
| Here is an example just for a single day (only testing select, not update): ran in 8s: 
 WITH temp AS (
    -- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
    SELECT
        im.intersection_uid,
        dt.datetime_bin,
        im.classification_uid,
        im.leg,
        im.movement_uid,
        0 AS volume
    FROM miovision_api.intersection_movements AS im
    CROSS JOIN generate_series(
        '2024-02-07'::date,
        '2024-02-08'::date - interval '15 minutes',
        interval '15 minutes'
    ) AS dt(datetime_bin)
    WHERE
        --0 padding for certain modes (padding)
        im.classification_uid IN (1,2,6,10)
        --AND im.intersection_uid = ANY(target_intersections)
        
    UNION ALL
    
    --real volumes
    SELECT
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin) AS datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        SUM(volume)
    FROM miovision_api.volumes AS v
    --only aggregate common movements
    JOIN miovision_api.intersection_movements USING (
        intersection_uid, classification_uid, leg, movement_uid
    )
    WHERE
        v.datetime_bin >= '2024-02-07'::date
        AND v.datetime_bin < '2024-02-08'::date
        --AND v.intersection_uid = ANY(target_intersections)
    GROUP BY
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin),
        v.classification_uid,
        v.leg,
        v.movement_uid
),
aggregate_insert AS (
    /*INSERT INTO miovision_api.volumes_15min_mvt(
        intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
    )*/
    SELECT DISTINCT ON (
        v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
    )
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        CASE
            --set unacceptable gaps as nulls
            WHEN un.datetime_bin IS NOT NULL THEN NULL
            --gap fill with zeros (restricted to certain modes in temp CTE)
            ELSE v.volume
        END AS volume
    FROM temp AS v
    JOIN miovision_api.intersections AS i USING (intersection_uid)
    --set unacceptable gaps as null
    LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
        intersection_uid, datetime_bin
    )
    WHERE
        -- Only include dates during which intersection is active 
        -- (excludes entire day it was added/removed)
        v.datetime_bin >= i.date_installed + interval '1 day'
        AND (
            i.date_decommissioned IS NULL
            OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
        )
    ORDER BY 
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        --select real value instead of padding value if available
        v.volume DESC
)
SELECT COUNT(*), SUM(volume), 'new' AS source FROM aggregate_insert
UNION
SELECT COUNT(*), SUM(volume), 'old' AS source FROM miovision_api.volumes_15min_mvt WHERE datetime_bin >= '2024-02-07'::date AND datetime_bin < '2024-02-08'::date And this select for 1 month, 2 intersections ran in 6s!!! 
 WITH temp AS (
    -- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
    SELECT
        im.intersection_uid,
        dt.datetime_bin,
        im.classification_uid,
        im.leg,
        im.movement_uid,
        0 AS volume
    FROM miovision_api.intersection_movements AS im
    CROSS JOIN generate_series(
        '2024-01-07'::date,
        '2024-02-08'::date - interval '15 minutes',
        interval '15 minutes'
    ) AS dt(datetime_bin)
    WHERE
        --0 padding for certain modes (padding)
        im.classification_uid IN (1,2,6,10)
        AND im.intersection_uid = ANY(ARRAY[12,25]::integer [])
        
    UNION ALL
    
    --real volumes
    SELECT
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin) AS datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        SUM(volume)
    FROM miovision_api.volumes AS v
    --only aggregate common movements
    JOIN miovision_api.intersection_movements USING (
        intersection_uid, classification_uid, leg, movement_uid
    )
    WHERE
        v.datetime_bin >= '2024-01-07'::date
        AND v.datetime_bin < '2024-02-08'::date
        AND v.intersection_uid = ANY(ARRAY[12,25]::integer [])
    GROUP BY
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin),
        v.classification_uid,
        v.leg,
        v.movement_uid
),
aggregate_insert AS (
    /*INSERT INTO miovision_api.volumes_15min_mvt(
        intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
    )*/
    SELECT DISTINCT ON (
        v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
    )
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        CASE
            --set unacceptable gaps as nulls
            WHEN un.datetime_bin IS NOT NULL THEN NULL
            --gap fill with zeros (restricted to certain modes in temp CTE)
            ELSE v.volume
        END AS volume
    FROM temp AS v
    JOIN miovision_api.intersections AS i USING (intersection_uid)
    --set unacceptable gaps as null
    LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
        intersection_uid, datetime_bin
    )
    WHERE
        -- Only include dates during which intersection is active 
        -- (excludes entire day it was added/removed)
        v.datetime_bin >= i.date_installed + interval '1 day'
        AND (
            i.date_decommissioned IS NULL
            OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
        )
    ORDER BY 
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        --select real value instead of padding value if available
        v.volume DESC
)
SELECT COUNT(*), SUM(volume), 'new' AS source FROM aggregate_insert
UNION
SELECT COUNT(*), SUM(volume), 'old' AS source FROM miovision_api.volumes_15min_mvt
WHERE datetime_bin >= '2024-01-07'::date AND datetime_bin < '2024-02-08'::date 
AND intersection_uid = ANY(ARRAY[12,25]::integer []) | 
| OR (v.datetime_bin < i.date_decommissioned - interval '1 day') | ||
| ) | ||
| --exclude movements already aggregated | ||
| AND v.volume_15min_mvt_uid IS NULL | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this excluded cause we have the clearing function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly! it is redundant - we always run clear + aggregate together whether through run-api or airflow.
| I tested it on a day with unacceptable gaps and the result is different comparing to the data in 15_mvt. I am wondering if the following worked previously with the old function as intended cause I dont see Nulls for unacceptable gaps in the 15_mvt table. I tested with the date   | 
| Here's the test you ran last week @chmnata which gave suspicious results along with my investigation ✅ 
 --create a temp table to store and compare testing results
create table gwolofs.miovision_Test_null as
WITH temp AS (
    -- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
    SELECT
        im.intersection_uid,
        dt.datetime_bin,
        im.classification_uid,
        im.leg,
        im.movement_uid,
        0 AS volume
    FROM miovision_api.intersection_movements AS im
    CROSS JOIN generate_series(
        '2024-01-02'::date,
        '2024-01-03'::date - interval '15 minutes',
        interval '15 minutes'
    ) AS dt(datetime_bin)
    WHERE
        --0 padding for certain modes (padding)
        im.classification_uid IN (1,2,6,10)
        --AND im.intersection_uid = ANY(target_intersections)
        
    UNION ALL
    
    --real volumes
    SELECT
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin) AS datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        SUM(volume)
    FROM miovision_api.volumes AS v
    --only aggregate common movements
    JOIN miovision_api.intersection_movements USING (
        intersection_uid, classification_uid, leg, movement_uid
    )
    WHERE
 v.datetime_bin >= '2024-01-02'::date
        AND v.datetime_bin < '2024-01-03'::date
        --AND v.intersection_uid = ANY(target_intersections)
    GROUP BY
        v.intersection_uid,
        datetime_bin_15(v.datetime_bin),
        v.classification_uid,
        v.leg,
        v.movement_uid
)
    /*INSERT INTO miovision_api.volumes_15min_mvt(
        intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
    )*/
    SELECT DISTINCT ON (
        v.intersection_uid, v.datetime_bin, v.classification_uid, v.leg, v.movement_uid
    )
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        CASE
            --set unacceptable gaps as nulls
            WHEN un.datetime_bin IS NOT NULL THEN NULL
            --gap fill with zeros (restricted to certain modes in temp CTE)
            ELSE v.volume
        END AS volume
    FROM temp AS v
    JOIN miovision_api.intersections AS i USING (intersection_uid)
    --set unacceptable gaps as null
    LEFT JOIN miovision_api.unacceptable_gaps AS un USING (
        intersection_uid, datetime_bin
    )
    WHERE
        -- Only include dates during which intersection is active 
        -- (excludes entire day it was added/removed)
        v.datetime_bin >= i.date_installed + interval '1 day'
        AND (
            i.date_decommissioned IS NULL
            OR (v.datetime_bin < i.date_decommissioned - interval '1 day')
        )
    ORDER BY 
        v.intersection_uid,
        v.datetime_bin,
        v.classification_uid,
        v.leg,
        v.movement_uid,
        --select real value instead of padding value if available
        v.volume DESC NULLS LAST
GRANT ALL ON gwolofs.miovision_Test_null TO natalie, dbadmin;
--the 44 records that do not match are all for intersection_uid = 60 AND datetime_bin = '2024-01-02 23:45:00'.
--These records are zero in the old result and null in the new result. The record is in unacceptable_ranges, so it should be null. 
SELECT intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
FROM miovision_api.volumes_15min_mvt as v
WHERE v.datetime_bin >= '2024-01-02'::date
AND v.datetime_bin < '2024-01-03'::date
EXCEPT
SELECT * from gwolofs.miovision_Test_null
WHERE intersection_uid NOT IN (67, 68) --these intersections were not aggregated in the old dag run from 01-02 because they were only added to intersection_movements later
--the 44 records that do not match are all for intersection_uid = 60 AND datetime_bin = '2024-01-02 23:45:00'.
--These records are null in the new result and zero in the old result. The record is in unacceptable_ranges, so it should be null. 
SELECT * from gwolofs.miovision_Test_null
WHERE intersection_uid NOT IN (67, 68) --these intersections were not aggregated in the old dag run from 01-02 because they were only added to intersection_movements later
EXCEPT
SELECT intersection_uid, datetime_bin, classification_uid, leg, movement_uid, 
volume 
FROM miovision_api.volumes_15min_mvt as v
WHERE v.datetime_bin >= '2024-01-02'::date AND v.datetime_bin < '2024-01-03'::date
--correctly get nulls for this case when re-aggregating using old function:
    SELECT
        im.intersection_uid,
        dt.datetime_bin,
        im.classification_uid,
        im.leg,
        im.movement_uid,
        CASE
            --set unacceptable gaps as nulls
            WHEN un.datetime_bin IS NOT NULL THEN NULL
            --gap fill with zeros (restricted to certain modes in having clause)
            ELSE (COALESCE(SUM(v.volume), 0))
        END AS volume
    -- Cross product of dates, intersections, legal movement for cars, bikes, and peds to aggregate
    FROM miovision_api.intersection_movements AS im
    CROSS JOIN generate_series(
        '2024-01-02 23:45:00'::timestamp,
        '2024-01-03'::timestamp - interval '15 minutes',
        interval '15 minutes'
    ) AS dt(datetime_bin)
    JOIN miovision_api.intersections AS mai USING (intersection_uid)     
    --To avoid aggregating unacceptable gaps
    LEFT JOIN miovision_api.unacceptable_gaps AS un ON
        un.intersection_uid = im.intersection_uid
        --remove the 15 minute bin containing any unacceptable gaps
        AND dt.datetime_bin = un.datetime_bin
    --To get 1min bins
    LEFT JOIN miovision_api.volumes AS v ON
        --help query choose correct partition
        v.datetime_bin = '2024-01-02 23:45:00'::timestamp
        AND v.datetime_bin >= dt.datetime_bin
        AND v.datetime_bin < dt.datetime_bin + interval '15 minutes'
        AND v.intersection_uid = im.intersection_uid
        AND v.classification_uid = im.classification_uid
        AND v.leg = im.leg
        AND v.movement_uid = im.movement_uid
    WHERE
        -- Only include dates during which intersection is active 
        -- (excludes entire day it was added/removed)
        dt.datetime_bin > mai.date_installed + interval '1 day'
        AND (
            mai.date_decommissioned IS NULL
            OR (dt.datetime_bin < mai.date_decommissioned - interval '1 day')
        )
        --exclude movements already aggregated
        AND v.volume_15min_mvt_uid IS NULL
        AND im.intersection_uid = ANY(ARRAY[46]::integer[])
    GROUP BY
        im.intersection_uid,
        dt.datetime_bin,
        im.classification_uid,
        im.leg,
        im.movement_uid, 
        un.datetime_bin
    HAVING
        --retain 0s for certain modes (padding)
        im.classification_uid IN (1,2,6,10)
        OR SUM(v.volume) > 0 
 | 
| 
 Thanks for the investigation! Do you know how much of the data currently in 15_mvt will need to be updated due to changed unacceptable gap? | 
| Here are 12 dates with records that should be nulls but are not. You were lucky to pick one of them at random! I will create a new issue for this investigation. 
 SELECT datetime_bin::date, COUNT(*)
FROM miovision_api.volumes_15min_mvt AS v
JOIN miovision_api.unacceptable_gaps AS un USING (intersection_uid, datetime_bin)
WHERE v.volume IS NOT NULL
GROUP BY 1
ORDER BY 1 DESC | 
| This new year is giving me the luck we needed!!!!! 🧧 | 
| 
 We'll fix all the downstream tables as well by running  | 
| updated in the database | 
| This aggregation today took 5 minutes instead of 6 hours like above! 🥳 only 70x faster in the end, not 3000. | 
What this pull request accomplishes:
Issue(s) this solves:
What, in particular, needs to reviewed:
What needs to be done by a sysadmin after this PR is merged